{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Preprocessing using a Dataflow\n",
    "\n",
    "This notebook illustrates:\n",
    "\n",
    "* Creating datasets for Machine Learning using Dataflow\n",
    "\n",
    "\n",
    "While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.\n",
    "\n",
    "Each learning objective will correspond to a __#TODO__ in this student lab notebook -- try to complete this notebook first and then review the [solution notebook](../solutions/preproc.ipynb).\n"
      ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
   "!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied: google-cloud-bigquery==3.4.1 in /home/jupyter/.local/lib/python3.7/site-packages (3.4.1)\n",
      "Requirement already satisfied: protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.19.5 in /opt/conda/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (3.19.6)\n",
      "Requirement already satisfied: google-cloud-core<3.0.0dev,>=1.4.1 in /home/jupyter/.local/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (1.7.3)\n",
      "Requirement already satisfied: grpcio<2.0dev,>=1.47.0 in /opt/conda/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (1.51.1)\n",
      "Requirement already satisfied: google-resumable-media<3.0dev,>=0.6.0 in /home/jupyter/.local/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (2.4.0)\n",
      "Requirement already satisfied: requests<3.0.0dev,>=2.21.0 in /opt/conda/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (2.28.1)\n",
      "Requirement already satisfied: python-dateutil<3.0dev,>=2.7.2 in /opt/conda/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (2.8.2)\n",
      "Requirement already satisfied: proto-plus<2.0.0dev,>=1.15.0 in /opt/conda/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (1.22.1)\n",
      "Requirement already satisfied: google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5 in /opt/conda/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (1.34.0)\n",
      "Requirement already satisfied: packaging<22.0.0dev,>=14.3 in /home/jupyter/.local/lib/python3.7/site-packages (from google-cloud-bigquery==3.4.1) (21.3)\n",
      "Requirement already satisfied: google-auth<3.0dev,>=1.25.0 in /home/jupyter/.local/lib/python3.7/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (1.35.0)\n",
      "Requirement already satisfied: googleapis-common-protos<2.0dev,>=1.56.2 in /opt/conda/lib/python3.7/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (1.57.0)\n",
      "Requirement already satisfied: grpcio-status<2.0dev,>=1.33.2 in /opt/conda/lib/python3.7/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (1.48.2)\n",
      "Requirement already satisfied: six>=1.12.0 in /opt/conda/lib/python3.7/site-packages (from google-cloud-core<3.0.0dev,>=1.4.1->google-cloud-bigquery==3.4.1) (1.16.0)\n",
      "Requirement already satisfied: google-crc32c<2.0dev,>=1.0 in /opt/conda/lib/python3.7/site-packages (from google-resumable-media<3.0dev,>=0.6.0->google-cloud-bigquery==3.4.1) (1.5.0)\n",
      "Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /opt/conda/lib/python3.7/site-packages (from packaging<22.0.0dev,>=14.3->google-cloud-bigquery==3.4.1) (3.0.9)\n",
      "Requirement already satisfied: charset-normalizer<3,>=2 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0dev,>=2.21.0->google-cloud-bigquery==3.4.1) (2.1.1)\n",
      "Requirement already satisfied: certifi>=2017.4.17 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0dev,>=2.21.0->google-cloud-bigquery==3.4.1) (2022.12.7)\n",
      "Requirement already satisfied: urllib3<1.27,>=1.21.1 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0dev,>=2.21.0->google-cloud-bigquery==3.4.1) (1.26.13)\n",
      "Requirement already satisfied: idna<4,>=2.5 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0dev,>=2.21.0->google-cloud-bigquery==3.4.1) (3.4)\n",
      "Requirement already satisfied: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.7/site-packages (from google-auth<3.0dev,>=1.25.0->google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (0.2.8)\n",
      "Requirement already satisfied: rsa<5,>=3.1.4 in /opt/conda/lib/python3.7/site-packages (from google-auth<3.0dev,>=1.25.0->google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (4.9)\n",
      "Requirement already satisfied: setuptools>=40.3.0 in /opt/conda/lib/python3.7/site-packages (from google-auth<3.0dev,>=1.25.0->google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (65.5.1)\n",
      "Requirement already satisfied: cachetools<5.0,>=2.0.0 in /home/jupyter/.local/lib/python3.7/site-packages (from google-auth<3.0dev,>=1.25.0->google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (4.2.4)\n",
      "Requirement already satisfied: pyasn1<0.5.0,>=0.4.6 in /opt/conda/lib/python3.7/site-packages (from pyasn1-modules>=0.2.1->google-auth<3.0dev,>=1.25.0->google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-cloud-bigquery==3.4.1) (0.4.8)\n"
     ]
    }
   ],
   "source": [
    "!pip install --user google-cloud-bigquery==3.4.1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Kindly ignore the deprecation warnings and incompatibility errors related to google-cloud-storage."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied: apache-beam[interactive]==2.43.0 in /home/jupyter/.local/lib/python3.7/site-packages (2.43.0)\n",
      "Requirement already satisfied: pydot<2,>=1.2.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (1.4.2)\n",
      "Requirement already satisfied: dill<0.3.2,>=0.3.1.1 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (0.3.1.1)\n",
      "Requirement already satisfied: numpy<1.23.0,>=1.14.3 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (1.21.6)\n",
      "Requirement already satisfied: pyarrow<10.0.0,>=0.15.1 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (0.17.1)\n",
      "Requirement already satisfied: zstandard<1,>=0.18.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (0.18.0)\n",
      "Requirement already satisfied: typing-extensions>=3.7.0 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (3.7.4.3)\n",
      "Requirement already satisfied: regex>=2020.6.8 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (2022.10.31)\n",
      "Requirement already satisfied: proto-plus<2,>=1.7.1 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (1.22.1)\n",
      "Requirement already satisfied: hdfs<3.0.0,>=2.1.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (2.7.0)\n",
      "Requirement already satisfied: pymongo<4.0.0,>=3.8.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (3.13.0)\n",
      "Requirement already satisfied: protobuf<4,>3.12.2 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (3.19.6)\n",
      "Requirement already satisfied: cloudpickle~=2.2.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (2.2.0)\n",
      "Requirement already satisfied: objsize<0.6.0,>=0.5.2 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (0.5.2)\n",
      "Requirement already satisfied: pytz>=2018.3 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (2022.6)\n",
      "Requirement already satisfied: python-dateutil<3,>=2.8.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (2.8.2)\n",
      "Requirement already satisfied: orjson<4.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (3.8.3)\n",
      "Requirement already satisfied: fastavro<2,>=0.23.6 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (0.23.6)\n",
      "Requirement already satisfied: requests<3.0.0,>=2.24.0 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (2.28.1)\n",
      "Requirement already satisfied: fasteners<1.0,>=0.3 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (0.18)\n",
      "Requirement already satisfied: crcmod<2.0,>=1.7 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (1.7)\n",
      "Requirement already satisfied: httplib2<0.21.0,>=0.8 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (0.17.4)\n",
      "Requirement already satisfied: grpcio!=1.48.0,<2,>=1.33.1 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (1.51.1)\n",
      "Requirement already satisfied: ipywidgets<8,>=7.6.5 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (7.7.2)\n",
      "Requirement already satisfied: timeloop<2,>=1.0.2 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (1.0.2)\n",
      "Requirement already satisfied: ipykernel<7,>=6 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (6.16.2)\n",
      "Requirement already satisfied: ipython<8,>=7 in /opt/conda/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (7.34.0)\n",
      "Requirement already satisfied: google-cloud-dataproc<3.2.0,>=3.0.0 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (3.1.1)\n",
      "Requirement already satisfied: jupyter-client<6.1.13,>=6.1.11 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (6.1.12)\n",
      "Requirement already satisfied: facets-overview<2,>=1.0.0 in /home/jupyter/.local/lib/python3.7/site-packages (from apache-beam[interactive]==2.43.0) (1.0.0)\n",
      "Requirement already satisfied: pandas>=0.22.0 in /opt/conda/lib/python3.7/site-packages (from facets-overview<2,>=1.0.0->apache-beam[interactive]==2.43.0) (1.3.5)\n",
      "Requirement already satisfied: google-api-core[grpc]<3.0.0dev,>=1.28.0 in /opt/conda/lib/python3.7/site-packages (from google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (1.34.0)\n",
      "Requirement already satisfied: docopt in /opt/conda/lib/python3.7/site-packages (from hdfs<3.0.0,>=2.1.0->apache-beam[interactive]==2.43.0) (0.6.2)\n",
      "Requirement already satisfied: six>=1.9.0 in /opt/conda/lib/python3.7/site-packages (from hdfs<3.0.0,>=2.1.0->apache-beam[interactive]==2.43.0) (1.16.0)\n",
      "Requirement already satisfied: packaging in /home/jupyter/.local/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (21.3)\n",
      "Requirement already satisfied: psutil in /opt/conda/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (5.9.3)\n",
      "Requirement already satisfied: pyzmq>=17 in /opt/conda/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (24.0.1)\n",
      "Requirement already satisfied: nest-asyncio in /opt/conda/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (1.5.6)\n",
      "Requirement already satisfied: tornado>=6.1 in /opt/conda/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (6.2)\n",
      "Requirement already satisfied: traitlets>=5.1.0 in /opt/conda/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (5.7.1)\n",
      "Requirement already satisfied: matplotlib-inline>=0.1 in /opt/conda/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (0.1.6)\n",
      "Requirement already satisfied: debugpy>=1.0 in /opt/conda/lib/python3.7/site-packages (from ipykernel<7,>=6->apache-beam[interactive]==2.43.0) (1.6.4)\n",
      "Requirement already satisfied: backcall in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (0.2.0)\n",
      "Requirement already satisfied: pickleshare in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (0.7.5)\n",
      "Requirement already satisfied: pexpect>4.3 in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (4.8.0)\n",
      "Requirement already satisfied: setuptools>=18.5 in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (65.5.1)\n",
      "Requirement already satisfied: decorator in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (5.1.1)\n",
      "Requirement already satisfied: pygments in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (2.13.0)\n",
      "Requirement already satisfied: jedi>=0.16 in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (0.18.2)\n",
      "Requirement already satisfied: prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0 in /opt/conda/lib/python3.7/site-packages (from ipython<8,>=7->apache-beam[interactive]==2.43.0) (3.0.36)\n",
      "Requirement already satisfied: ipython-genutils~=0.2.0 in /opt/conda/lib/python3.7/site-packages (from ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.2.0)\n",
      "Requirement already satisfied: jupyterlab-widgets<3,>=1.0.0 in /home/jupyter/.local/lib/python3.7/site-packages (from ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.1.1)\n",
      "Requirement already satisfied: widgetsnbextension~=3.6.0 in /home/jupyter/.local/lib/python3.7/site-packages (from ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (3.6.1)\n",
      "Requirement already satisfied: jupyter-core>=4.6.0 in /opt/conda/lib/python3.7/site-packages (from jupyter-client<6.1.13,>=6.1.11->apache-beam[interactive]==2.43.0) (4.12.0)\n",
      "Requirement already satisfied: pyparsing>=2.1.4 in /opt/conda/lib/python3.7/site-packages (from pydot<2,>=1.2.0->apache-beam[interactive]==2.43.0) (3.0.9)\n",
      "Requirement already satisfied: certifi>=2017.4.17 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0,>=2.24.0->apache-beam[interactive]==2.43.0) (2022.12.7)\n",
      "Requirement already satisfied: charset-normalizer<3,>=2 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0,>=2.24.0->apache-beam[interactive]==2.43.0) (2.1.1)\n",
      "Requirement already satisfied: urllib3<1.27,>=1.21.1 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0,>=2.24.0->apache-beam[interactive]==2.43.0) (1.26.13)\n",
      "Requirement already satisfied: idna<4,>=2.5 in /opt/conda/lib/python3.7/site-packages (from requests<3.0.0,>=2.24.0->apache-beam[interactive]==2.43.0) (3.4)\n",
      "Requirement already satisfied: google-auth<3.0dev,>=1.25.0 in /home/jupyter/.local/lib/python3.7/site-packages (from google-api-core[grpc]<3.0.0dev,>=1.28.0->google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (1.35.0)\n",
      "Requirement already satisfied: googleapis-common-protos<2.0dev,>=1.56.2 in /opt/conda/lib/python3.7/site-packages (from google-api-core[grpc]<3.0.0dev,>=1.28.0->google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (1.57.0)\n",
      "Requirement already satisfied: grpcio-status<2.0dev,>=1.33.2 in /opt/conda/lib/python3.7/site-packages (from google-api-core[grpc]<3.0.0dev,>=1.28.0->google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (1.48.2)\n",
      "Requirement already satisfied: parso<0.9.0,>=0.8.0 in /opt/conda/lib/python3.7/site-packages (from jedi>=0.16->ipython<8,>=7->apache-beam[interactive]==2.43.0) (0.8.3)\n",
      "Requirement already satisfied: ptyprocess>=0.5 in /opt/conda/lib/python3.7/site-packages (from pexpect>4.3->ipython<8,>=7->apache-beam[interactive]==2.43.0) (0.7.0)\n",
      "Requirement already satisfied: wcwidth in /opt/conda/lib/python3.7/site-packages (from prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0->ipython<8,>=7->apache-beam[interactive]==2.43.0) (0.2.5)\n",
      "Requirement already satisfied: notebook>=4.4.1 in /opt/conda/lib/python3.7/site-packages (from widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (6.5.2)\n",
      "Requirement already satisfied: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.7/site-packages (from google-auth<3.0dev,>=1.25.0->google-api-core[grpc]<3.0.0dev,>=1.28.0->google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (0.2.8)\n",
      "Requirement already satisfied: rsa<5,>=3.1.4 in /opt/conda/lib/python3.7/site-packages (from google-auth<3.0dev,>=1.25.0->google-api-core[grpc]<3.0.0dev,>=1.28.0->google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (4.9)\n",
      "Requirement already satisfied: cachetools<5.0,>=2.0.0 in /home/jupyter/.local/lib/python3.7/site-packages (from google-auth<3.0dev,>=1.25.0->google-api-core[grpc]<3.0.0dev,>=1.28.0->google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (4.2.4)\n",
      "Requirement already satisfied: nbconvert>=5 in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (7.2.6)\n",
      "Requirement already satisfied: argon2-cffi in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (21.3.0)\n",
      "Requirement already satisfied: Send2Trash>=1.8.0 in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.8.0)\n",
      "Requirement already satisfied: nbclassic>=0.4.7 in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.4.8)\n",
      "Requirement already satisfied: nbformat in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (5.7.0)\n",
      "Requirement already satisfied: prometheus-client in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.15.0)\n",
      "Requirement already satisfied: jinja2 in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (3.1.2)\n",
      "Requirement already satisfied: terminado>=0.8.3 in /opt/conda/lib/python3.7/site-packages (from notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.17.1)\n",
      "Requirement already satisfied: notebook-shim>=0.1.0 in /opt/conda/lib/python3.7/site-packages (from nbclassic>=0.4.7->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.2.2)\n",
      "Requirement already satisfied: jupyter-server>=1.8 in /opt/conda/lib/python3.7/site-packages (from nbclassic>=0.4.7->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.23.3)\n",
      "Requirement already satisfied: nbclient>=0.5.0 in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.7.2)\n",
      "Requirement already satisfied: beautifulsoup4 in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (4.11.1)\n",
      "Requirement already satisfied: importlib-metadata>=3.6 in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (5.1.0)\n",
      "Requirement already satisfied: bleach in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (5.0.1)\n",
      "Requirement already satisfied: defusedxml in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.7.1)\n",
      "Requirement already satisfied: jupyterlab-pygments in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.2.2)\n",
      "Requirement already satisfied: pandocfilters>=1.4.1 in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.5.0)\n",
      "Requirement already satisfied: markupsafe>=2.0 in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (2.1.1)\n",
      "Requirement already satisfied: tinycss2 in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.2.1)\n",
      "Requirement already satisfied: mistune<3,>=2.0.3 in /opt/conda/lib/python3.7/site-packages (from nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (2.0.4)\n",
      "Requirement already satisfied: jsonschema>=2.6 in /opt/conda/lib/python3.7/site-packages (from nbformat->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (4.17.3)\n",
      "Requirement already satisfied: fastjsonschema in /opt/conda/lib/python3.7/site-packages (from nbformat->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (2.16.2)\n",
      "Requirement already satisfied: pyasn1<0.5.0,>=0.4.6 in /opt/conda/lib/python3.7/site-packages (from pyasn1-modules>=0.2.1->google-auth<3.0dev,>=1.25.0->google-api-core[grpc]<3.0.0dev,>=1.28.0->google-cloud-dataproc<3.2.0,>=3.0.0->apache-beam[interactive]==2.43.0) (0.4.8)\n",
      "Requirement already satisfied: argon2-cffi-bindings in /opt/conda/lib/python3.7/site-packages (from argon2-cffi->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (21.2.0)\n",
      "Requirement already satisfied: zipp>=0.5 in /opt/conda/lib/python3.7/site-packages (from importlib-metadata>=3.6->nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (3.11.0)\n",
      "Requirement already satisfied: importlib-resources>=1.4.0 in /opt/conda/lib/python3.7/site-packages (from jsonschema>=2.6->nbformat->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (5.10.1)\n",
      "Requirement already satisfied: attrs>=17.4.0 in /opt/conda/lib/python3.7/site-packages (from jsonschema>=2.6->nbformat->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (22.1.0)\n",
      "Requirement already satisfied: pyrsistent!=0.17.0,!=0.17.1,!=0.17.2,>=0.14.0 in /opt/conda/lib/python3.7/site-packages (from jsonschema>=2.6->nbformat->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.19.2)\n",
      "Requirement already satisfied: pkgutil-resolve-name>=1.3.10 in /opt/conda/lib/python3.7/site-packages (from jsonschema>=2.6->nbformat->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.3.10)\n",
      "Requirement already satisfied: anyio<4,>=3.1.0 in /opt/conda/lib/python3.7/site-packages (from jupyter-server>=1.8->nbclassic>=0.4.7->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (3.6.2)\n",
      "Requirement already satisfied: websocket-client in /opt/conda/lib/python3.7/site-packages (from jupyter-server>=1.8->nbclassic>=0.4.7->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.4.2)\n",
      "Requirement already satisfied: cffi>=1.0.1 in /opt/conda/lib/python3.7/site-packages (from argon2-cffi-bindings->argon2-cffi->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.15.1)\n",
      "Requirement already satisfied: soupsieve>1.2 in /opt/conda/lib/python3.7/site-packages (from beautifulsoup4->nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (2.3.2.post1)\n",
      "Requirement already satisfied: webencodings in /opt/conda/lib/python3.7/site-packages (from bleach->nbconvert>=5->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (0.5.1)\n",
      "Requirement already satisfied: sniffio>=1.1 in /opt/conda/lib/python3.7/site-packages (from anyio<4,>=3.1.0->jupyter-server>=1.8->nbclassic>=0.4.7->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (1.3.0)\n",
      "Requirement already satisfied: pycparser in /opt/conda/lib/python3.7/site-packages (from cffi>=1.0.1->argon2-cffi-bindings->argon2-cffi->notebook>=4.4.1->widgetsnbextension~=3.6.0->ipywidgets<8,>=7.6.5->apache-beam[interactive]==2.43.0) (2.21)\n",
      "Collecting pyarrow==10.0.1\n",
      "  Downloading pyarrow-10.0.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (36.0 MB)\n",
      "\u001b[2K     \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m36.0/36.0 MB\u001b[0m \u001b[31m37.8 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m00:01\u001b[0m00:01\u001b[0m\n",
      "\u001b[?25hRequirement already satisfied: numpy>=1.16.6 in /opt/conda/lib/python3.7/site-packages (from pyarrow==10.0.1) (1.21.6)\n",
      "Installing collected packages: pyarrow\n",
      "  Attempting uninstall: pyarrow\n",
      "    Found existing installation: pyarrow 0.17.1\n",
      "    Uninstalling pyarrow-0.17.1:\n",
      "      Successfully uninstalled pyarrow-0.17.1\n",
      "\u001b[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n",
      "apache-beam 2.43.0 requires pyarrow<10.0.0,>=0.15.1, but you have pyarrow 10.0.1 which is incompatible.\n",
      "tfx-bsl 1.12.0 requires google-api-python-client<2,>=1.7.11, but you have google-api-python-client 2.70.0 which is incompatible.\n",
      "tfx-bsl 1.12.0 requires pyarrow<7,>=6, but you have pyarrow 10.0.1 which is incompatible.\n",
      "tfx-bsl 1.12.0 requires tensorflow<3,>=2.11, but you have tensorflow 2.6.5 which is incompatible.\n",
      "tensorflow-transform 1.12.0 requires pyarrow<7,>=6, but you have pyarrow 10.0.1 which is incompatible.\n",
      "tensorflow-transform 1.12.0 requires tensorflow<2.12,>=2.11.0, but you have tensorflow 2.6.5 which is incompatible.\u001b[0m\u001b[31m\n",
      "\u001b[0mSuccessfully installed pyarrow-10.0.1\n"
     ]
    }
   ],
   "source": [
     "!pip install --user apache-beam[interactive]==2.43.0\n",
     "!pip install pyarrow==10.0.1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**NOTE**: In the output of the above cell you can safely ignore any **WARNINGS** (in Yellow text) related to: \"hdfscli\", \"hdfscli-avro\", \"pbr\", \"fastavro\", \"gen_client\" and **ERRORS** (in Red text) related to the related to: \"witwidget-gpu\", \"fairing\" etc."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you get any related errors or warnings mentioned above please rerun the above cell."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Note**: Restart your kernel to use updated packages."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Make sure the Dataflow API is enabled by going to this [link](https://console.developers.google.com/apis/api/dataflow.googleapis.com). Ensure that you've installed Beam by importing it and printing the version number."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2.43.0\n"
     ]
    }
   ],
   "source": [
    "import apache_beam as beam\n",
    "print(beam.__version__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "TensorFlow version:  2.6.5\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "print(\"TensorFlow version: \",tf.version.VERSION)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You may receive a `UserWarning` about the Apache Beam SDK for Python 3 as not being yet fully supported. Don't worry about this."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# change these to try this notebook out\n",
    "BUCKET = 'cloud-training-demos-ml'\n",
    "PROJECT = 'cloud-training-demos'\n",
    "REGION = 'us-central1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ['BUCKET'] = BUCKET\n",
    "os.environ['PROJECT'] = PROJECT\n",
    "os.environ['REGION'] = REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "if ! gcloud storage ls | grep -q gs://${BUCKET}/; then\n",    "  gcloud storage buckets create --location ${REGION} gs://${BUCKET}\n",    "fi"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h2> Save the query from earlier </h2>\n",
    "\n",
    "The data is natality data (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother.  Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create SQL query using natality data after the year 2000\n",
    "query = \"\"\"\n",
    "SELECT\n",
    "  weight_pounds,\n",
    "  is_male,\n",
    "  mother_age,\n",
    "  plurality,\n",
    "  gestation_weeks,\n",
    "  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth\n",
    "FROM\n",
    "  publicdata.samples.natality\n",
    "WHERE year > 2000\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
     {
      "data": {
       "text/html": [
        "<div>\n",
        "<style scoped>\n",
        "    .dataframe tbody tr th:only-of-type {\n",
        "        vertical-align: middle;\n",
        "    }\n",
        "\n",
        "    .dataframe tbody tr th {\n",
        "        vertical-align: top;\n",
        "    }\n",
        "\n",
        "    .dataframe thead th {\n",
        "        text-align: right;\n",
        "    }\n",
        "</style>\n",
        "<table border=\"1\" class=\"dataframe\">\n",
        "  <thead>\n",
        "    <tr style=\"text-align: right;\">\n",
        "      <th></th>\n",
        "      <th>weight_pounds</th>\n",
        "      <th>is_male</th>\n",
        "      <th>mother_age</th>\n",
        "      <th>plurality</th>\n",
        "      <th>gestation_weeks</th>\n",
        "      <th>hashmonth</th>\n",
        "    </tr>\n",
        "  </thead>\n",
        "  <tbody>\n",
        "    <tr>\n",
        "      <th>0</th>\n",
        "      <td>7.568469</td>\n",
        "      <td>True</td>\n",
        "      <td>22</td>\n",
        "      <td>1</td>\n",
        "      <td>46</td>\n",
        "      <td>-1403073183891835564</td>\n",
        "    </tr>\n",
        "    <tr>\n",
        "      <th>1</th>\n",
        "      <td>8.807467</td>\n",
        "      <td>True</td>\n",
        "      <td>39</td>\n",
        "      <td>1</td>\n",
        "      <td>42</td>\n",
        "      <td>1088037545023002395</td>\n",
        "    </tr>\n",
        "    <tr>\n",
        "      <th>2</th>\n",
        "      <td>8.313632</td>\n",
        "      <td>True</td>\n",
        "      <td>23</td>\n",
        "      <td>1</td>\n",
        "      <td>35</td>\n",
        "      <td>-2126480030009879160</td>\n",
        "    </tr>\n",
        "    <tr>\n",
        "      <th>3</th>\n",
        "      <td>8.000575</td>\n",
        "      <td>False</td>\n",
        "      <td>27</td>\n",
        "      <td>1</td>\n",
        "      <td>40</td>\n",
        "      <td>-7170969733900686954</td>\n",
        "    </tr>\n",
        "    <tr>\n",
        "      <th>4</th>\n",
        "      <td>6.563162</td>\n",
        "      <td>False</td>\n",
        "      <td>29</td>\n",
        "      <td>1</td>\n",
        "      <td>39</td>\n",
        "      <td>3408502330831153141</td>\n",
        "    </tr>\n",
        "  </tbody>\n",
        "</table>\n",
        "</div>"
       ],
       "text/plain": [
        "   weight_pounds  is_male  mother_age  plurality  gestation_weeks  hashmonth  \\\n",
        "0       7.568469     True          22          1               46  -1403073183891835564   \n",
        "1       8.807467     True          39          1               42  108803754502300239   \n",
        "2       8.313632     True          23          1               35  -2126480030009879160   \n",
        "3       8.000575    False          27          1               40  -7170969733900686954  \n",
        "4       6.563162    False          29          1               39  3408502330831153141   \n"
       ]
      },
      "execution_count": 7,
      "metadata": {},
      "output_type": "execute_result"
     }
   ],
   "source": [
    "# Call BigQuery and examine in dataframe\n",
    "from google.cloud import bigquery\n",
    "df = bigquery.Client().query(query + \" LIMIT 100\").to_dataframe()\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h2> Create ML dataset using Dataflow </h2>\n",
    "Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.\n",
    "\n",
    "Instead of using Beam/Dataflow, I had three other options:\n",
    "\n",
    "* Use Cloud Dataprep to visually author a Dataflow pipeline. Cloud Dataprep also allows me to explore the data, so we could have avoided much of the handcoding of Python/Seaborn calls above as well!\n",
    "* Read from BigQuery directly using TensorFlow.\n",
    "* Use the BigQuery console (http://bigquery.cloud.google.com) to run a Query and save the result as a CSV file. For larger datasets, you may have to select the option to \"allow large results\" and save the result into a CSV file on Google Cloud Storage. \n",
    "\n",
    "<p>\n",
    "\n",
    "However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.\n",
    "\n",
    "Note that after you launch this, the actual processing is happening on the cloud. Go to the GCP web console to the Dataflow section and monitor the running job. It took about 20 minutes for me.\n",
    "<p>\n",
    "If you wish to continue without doing this step, you can copy my preprocessed output:\n",
    "<pre>\n",
    "gcloud storage cp --recursive gs://cloud-training-demos/babyweight/preproc gs://your-bucket/\n",    "</pre>"
   ]
  },
  {
"cell_type": "markdown",
"metadata": {},
"source": [
    "**Lab Task #1:** Creating datasets for ML using Dataflow"
]
},
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
     {
      "name": "stdout",
      "output_type": "stream",
      "text": [
      "Launching Dataflow job preprocess-babyweight-features-200919-052122 ... hang on\n"
      ]
     }
   ],
   "source": [
    " # TODO 1\n",
    " # TODO -- Your code here.\n",
    "  if in_test_mode:\n",
    "      print('Launching local job ... hang on')\n",
    "      OUTPUT_DIR = './preproc'\n",
    "      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)\n",
    "      os.makedirs(OUTPUT_DIR)\n",
    "  else:\n",
    "      print('Launching Dataflow job {} ... hang on'.format(job_name))\n",
    "      OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)\n",
    "      try:\n",
    "        subprocess.check_call('gcloud storage rm --recursive {}'.format(OUTPUT_DIR).split())\n",    "      except:\n",
    "        pass\n",
    "\n",
    "  options = {\n",
    "      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),\n",
    "      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),\n",
    "      'job_name': job_name,\n",
    "      'project': PROJECT,\n",
    "      'region': REGION,\n",
    "      'teardown_policy': 'TEARDOWN_ALWAYS',\n",
    "      'no_save_main_session': True,\n",
    "      'num_workers': 4,\n",
    "      'max_num_workers': 5\n",
    "  }\n",
    "  opts = beam.pipeline.PipelineOptions(flags = [], **options)\n",
    "  if in_test_mode:\n",
    "      RUNNER = 'DirectRunner'\n",
    "  else:\n",
    "      RUNNER = 'DataflowRunner'\n",
    "  p = beam.Pipeline(RUNNER, options = opts)\n",
    "  query = \"\"\"\n",
    "SELECT\n",
    "  weight_pounds,\n",
    "  is_male,\n",
    "  mother_age,\n",
    "  plurality,\n",
    "  gestation_weeks,\n",
    "  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth\n",
    "FROM\n",
    "  publicdata.samples.natality\n",
    "WHERE year > 2000\n",
    "AND weight_pounds > 0\n",
    "AND mother_age > 0\n",
    "AND plurality > 0\n",
    "AND gestation_weeks > 0\n",
    "AND month > 0\n",
    "    \"\"\"\n",
    "\n",
    "  if in_test_mode:\n",
    "    query = query + ' LIMIT 100' \n",
    "\n",
    "  for step in ['train', 'eval']:\n",
    "    if step == 'train':\n",
    "      selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)\n",
    "    else:\n",
    "      selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)\n",
    "\n",
    "    (p \n",
    "     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))\n",
    "     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)\n",
    "     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))\n",
    "    )\n",
    "\n",
    "  job = p.run()\n",
    "  if in_test_mode:\n",
    "    job.wait_until_finish()\n",
    "    print(\"Done!\")\n",
    "    \n",
    "preprocess(in_test_mode = False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The above step will take 20+ minutes. Go to the GCP web console, navigate to the Dataflow section and <b>wait for the job to finish</b> before you run the following step."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Please re-run the above cell if you get a <b>failed status</b> of the job in the dataflow UI console."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
     {
      "name": "stdout",
      "output_type": "stream",
      "text": [
      "gs://qwiklabs-gcp-01-daba989dc3b1/babyweight/preproc/eval.csv-00000-of-00007\n",
      "gs://qwiklabs-gcp-01-daba989dc3b1/babyweight/preproc/train.csv-00000-of-00014\n"
      ]
     }
   ],
   "source": [
    "%%bash\n",
    "gcloud storage ls gs://${BUCKET}/babyweight/preproc/*-00000*"   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2022 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
